package org.databandtech.sparkstreaming;

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class App 
{
    public static void main( String[] args )
    {
        System.out.println( "Hello World!" );
        
        SparkSession spark = SparkSession
        		  .builder()
        		  .appName("JavaStructuredNetworkWordCount")
        		  .getOrCreate();
        
     // Create DataFrame representing the stream of input lines from connection to localhost:9999
        Dataset<Row> lines = spark
          .readStream()
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load();
        
     // Split the lines into words
        Dataset<String> words = lines
          .as(Encoders.STRING())
          .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
        
        Dataset<Row> wordCounts = words.groupBy("value").count();
        
     // Start running the query that prints the running counts to the console
        StreamingQuery query;
		try {
			query = wordCounts.writeStream()
			  .outputMode("update")
			  .format("console")
			  .start();
			
			System.out.println(query.lastProgress());
	        
			try {
				query.awaitTermination();
			} catch (StreamingQueryException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
	        
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}


    }
}
